package org.anyline.sync;

import org.anyline.data.entity.Column;
import org.anyline.data.entity.Table;
import org.anyline.data.jdbc.ds.DataSourceHolder;
import org.anyline.data.param.ConfigStore;
import org.anyline.data.param.init.DefaultConfigStore;
import org.anyline.entity.DataRow;
import org.anyline.entity.DataSet;
import org.anyline.service.AnylineService;
import org.anyline.util.BasicUtil;
import org.anyline.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.*;

@Component
public class SyncTask {
    private static Logger log = LoggerFactory.getLogger(SyncTask.class);
    @Autowired
    private AnylineService service;
    private static Map<String,Date> exe_times = new HashMap<>();

    @Value("${anyline.sync.task:}")
    private List<String> tasks;

    /*
     * 数据库配置说明
     * BS_DATASOURCE:配置数据源  注意帐号密码URL等敏感信息最好经过非对称加密,把私钥放在jar中再把jar加密
     *      CODE                    : 数据源标识,后续将根据这个值切换数据源
     *      DRIVER                  : 驱动类(根据数据库类型,如:com.mysql.cj.jdbc.Driver)
     *      URL                     : JDBC连接
     *      ACCOUNT                 : 帐号
     *      PASSWORD                : 密码
     *      TYPE_CLASS              : 连接池类(默认:com.zaxxer.hikari.HikariDataSource)
     * SYNC_TASK:同步配置
     *      TRIGGER_TYPE_CODE       : 触发方式 0:定时 1:被动
     *      DATA_STATUS             : 状态 0:不可用 不会执行 1:可用
     *      TYPE_CODE               : 0:全量(先truncate目标表再insert)
     *                                1:增量
     *                                2:更新
     *      INTERVALS               : 执行间隔时间(秒)
     *      SCHEDULE_ID             : 定时器(如果任务本身就在定时器中不要设置这个值)
     *      SRC_DATASOURCE_CODE     : 源数据源(BS_DATASOURCE.CODE)
     *      SRC_TABLE               : 源表
     *                                如果有多个表以,分隔,同时保持与TAR_TABLE中的表数量一致
     *                                如果要同步整个数据库可以SRC_TABLE设置成*(TAR_TABLE值将被忽略)
     *                                *-T1,T2 表示同步除了T1,T2的所有表
     *
     *      TAR_DATASOURCE_CODE     : 目标数据源
     *      TAR_TABLE               : 目标表(SRC_TABLE中出现*时 忽略当前值)
     *      COLS                    : 源表与目标表的 列对应关系(源列名:目标列名)
     *                                如ID:CD,NAME:NM,如果有一样列可以ID,NAME:NM,*表示全部
     *                                如果SRC_TABLE中设置了多个值或*,忽略当前值
     *      ORDER_COLUMN            : 排序列
     *      PK                      : 在更新模式下用到,根据主键更新目标表(单表时有效，多表时自动检测)
     *
     *      STEP                    : 每次同步多少行
     *      LAST_EXE_TIME           : 最后执行时间(注意多表的情况)
     *      LAST_EXE_VALUE          : 最后同步的主键值(注意多表的情况)
     *      LAST_EXE_QTY            : 最后一次同步了多少行(注意多表的情况)
     *      EXE_QTY                 : 一共同步了多行行(注意多表的情况)
     *
     *      AFTER_SQL_TAR           : 任务完成后执行的SQL 支持的变量${table}${max}
     *      AFTER_SQL_SRC           : 任务完成后执行的SQL
     */
    @Scheduled(cron="0 0/5 * * * ?")
    //@Scheduled(cron="* * * * * ?")
    private synchronized void scan(){
        DataSourceHolder.setDefaultDataSource();
        //注册数据源
        DataSet dss = service.querys("bs_datasource");
        for(DataRow ds:dss){
            String code = ds.getCode();
            String driver = ds.getString("DRIVER");
            String url = ds.getString("URL");
            String account = ds.getString("ACCOUNT");
            String password = ds.getString("PASSWORD");
            String pool = ds.getStringEvl("POOL_CLASS","com.zaxxer.hikari.HikariDataSource");
            try {
                if(!DataSourceHolder.contains(code)) {
                    DataSourceHolder.reg(code, pool, driver, url, account, password);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        //检测同步任务
        ConfigStore conditions = new DefaultConfigStore();
        if(null != this.tasks && this.tasks.size()>0){
            conditions.ands("CODE", tasks);
        }
        DataSet tasks = service.querys("sync_task",conditions,"TRIGGER_TYPE_CODE:0","DATA_STATUS:1", "ORDER BY TYPE_CODE");
        for(DataRow task:tasks){
            try {
                exe(task);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 执行同步(0:全量  1:增量  2:更新)
     * @param task 任务
     * @return 同步行数，-1:未执行
     */
    private int exe(DataRow task){
        log.warn("exe task:{}", task.getJson());
        DataSourceHolder.setDefaultDataSource();
        int type = task.getInt("TYPE_CODE",0);  //0:全量  1:增量  2:更新
        int interval = task.getInt("INTERVALS",0);
        int scheduleId = task.getInt("SCHEDULE_ID",0);
        if(scheduleId > 0){
            DataRow  schedule = null;
            schedule = service.query("bs_schedule","++ID:"+scheduleId);
            if(interval <= 0 && null == schedule){
                log.error("[任务异常][间隔时间异常或定时任务异常]");
                return -1;
            }
        }

        String srcTable = task.getString("SRC_TABLE");
        String srcDataSource = task.getString("SRC_DATASOURCE_CODE");
        String tarTable = task.getString("TAR_TABLE");
        String tarDataSource = task.getString("TAR_DATASOURCE_CODE");
        Date lastExeTime =  exe_times.get(srcDataSource+"."+srcTable);

        if(null != lastExeTime
                && DateUtil.diff(DateUtil.DATE_PART_SECOND, lastExeTime) < interval){
            log.warn("[sync start][未达到间隔时间][src:{}.{}][tar:{}.{}][last:{}][interval:{}]", srcDataSource, srcTable, tarDataSource, tarTable, lastExeTime,interval);
            return -1;
        }

        if(BasicUtil.isEmpty(srcTable)){
            log.error("未设置源表:{}", task.toJSON());
        }
        DataSourceHolder.setDataSource(srcDataSource);
        if(srcTable.contains("*")){
            List<String> tables = service.tables();
            if(srcTable.contains("-")){
                String[] ignores = srcTable.split("-")[1].split(",");
                for(String item:ignores) {
                    tables.remove(item);
                }
            }
            for(String table:tables){
                task.put("SRC_TABLE", table);
                task.put("TAR_TABLE", table);
                task.put("COLS","*");
                task.put("LAST_EXE_TIME", exe_times.get(srcDataSource+"."+table));
                exe(task);
            }
            return 0;
        }else if(srcTable.contains(",")){
            String[] srcs = srcTable.split(",");
            String[] tars = tarTable.split(",");
            if(srcs.length != tars.length){
                log.error("源表与目标表数据不一致,[src:{}],[tar:{}]", srcTable, tarTable);
                return -1;
            }
            int size = srcs.length;
            for(int i=0; i<size; i++){
                task.put("SRC_TABLE", srcs[i]);
                task.put("TAR_TABLE", tars[i]);
                task.put("COLS","*");
                exe(task);
            }
            return 0;
        }


        int qty = 0;//同步数量

        String queryCols = null;
        String[] cols = null;
        Map<String,String> cols_map = new HashMap<>(); //列对应关系 src:tar
        //源列名:目标列名
        String configCos = task.getStringNvl("COLS");
        if(BasicUtil.isNotEmpty(configCos)) {
            if (configCos.contains(",")) {
                cols = task.getString("COLS").split(",");
                for (String col : cols) {
                    String src_col = col;
                    String tar_col = col;
                    String[] tmps = col.split(":");
                    if (tmps.length > 1) {
                        src_col = tmps[0];
                        tar_col = tmps[0];
                    }
                    if (null == queryCols) {
                        queryCols = src_col;
                    } else {
                        queryCols += "," + src_col;
                    }
                    cols_map.put(src_col, tar_col);
                }
            }else{
                queryCols = configCos;
            }
            String orderKey = task.getString("ORDER_COLUMN");

            if(BasicUtil.isNotEmpty(orderKey) && !queryCols.contains(orderKey) && !queryCols.equals("*")){
                queryCols += ","+orderKey;
            }
        }else{
            queryCols = "*";
        }

        int step = task.getInt("STEP",100);
        String orderKey = task.getStringNvl("ORDER_COLUMN");
        if(BasicUtil.isEmpty(orderKey)){
            DataSourceHolder.setDataSource(srcDataSource);
            Table table = service.metadata().table(srcTable);
            Column col = table.primary();
            if(null != col) {
                orderKey = col.getName();
            }
        }

        String max = "0";
        int exeQty = 0;
        String fr = DateUtil.format();
        if(type == 0){//全量
            max = "0"; //最后同步的主键值(清空从0开始)
            DataSourceHolder.setDataSource(tarDataSource);
            log.warn("[sync start][src:{}.{}][tar:{}.{}]", srcDataSource, srcTable, tarDataSource, tarTable);
            service.execute("TRUNCATE TABLE " + tarTable);
            int tarQty = task.getInt("EXE_QTY",0);
            DataSourceHolder.setDataSource(srcDataSource);
            int srcQty = service.count(srcTable);
            //根据总数量 避免数据源清空时 同步数据
            if(srcQty < tarQty*0.9){
                log.warn("[sync start][数据源更新中][src:{}.{}][tar:{}.{}][src qty:{}]", srcDataSource, srcTable, tarDataSource, tarTable, srcQty);
                return -1;
            }
            while(true){
                try {
                    DataSourceHolder.setDataSource(srcDataSource);
                    DataSet set = service.querys(srcTable + "(" + queryCols + ")"
                            , 0, step - 1
                            , orderKey + ":>" + max, "ORDER BY " + orderKey);
                    max = set.getString(set.size() - 1, orderKey, "0");
                    if (set.size() == 0) {
                        break;
                    }
                    exeQty += set.size();
                    set = format(set,cols);
                    DataSourceHolder.setDataSource(tarDataSource);
                    service.insert(tarTable, set);
                    qty += set.size();
                    log.warn("[sync start][src:{}.{}][tar:{}.{}][qty:{}]", srcDataSource, srcTable, tarDataSource, tarTable, qty);

                    DataSourceHolder.setDefaultDataSource();
                    task.put("LAST_EXE_VALUE", max);
                    service.update(task,"LAST_EXE_QTY","LAST_EXE_TIME");


                    //执行后SQL
                    //${table} ${max} ${order}
                    String after = task.getString("AFTER_SQL_TAR");
                    if(BasicUtil.isNotEmpty(after)){
                        DataSourceHolder.setDataSource(tarDataSource);
                        after = after.replace("${table}", task.getString("TAR_TABLE"));
                        after = after.replace("${max}", max);
                        after = after.replace("${order}", orderKey);
                        service.execute(after);
                    }
                    after = task.getString("AFTER_SQL_SRC");
                    if(BasicUtil.isNotEmpty(after)){
                        DataSourceHolder.setDataSource(srcDataSource);
                        after = after.replace("${table}", task.getString("SRC_TABLE"));
                        after = after.replace("${max}", max);
                        after = after.replace("${order}", orderKey);
                        service.execute(after);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                    DataSourceHolder.setDefaultDataSource();
                    task.put("EXE_EXCEPTION",  DateUtil.format()+":"+e.getMessage());
                    service.save(task,"LAST_EXE_QTY","LAST_EXE_TIME","EXE_EXCEPTION");
                }
            }

        }else if(type == 1){//增量
            if(BasicUtil.isEmpty(orderKey)){
                log.warn("[未设置排序列][table:{}]", srcTable);
                return -1;
            }
            log.warn("[sync start][src:{}.{}][tar:{}.{}]", srcDataSource, srcTable, tarDataSource, tarTable);
            //到目标表中查询最大值
            DataSourceHolder.setDataSource(tarDataSource);
            DataRow checkMax = service.query(tarTable+"(max("+orderKey+") AS "+orderKey+")");
            if(null != checkMax){
                max = checkMax.getString(orderKey);
            }

            while (true){
                try {
                    DataSourceHolder.setDataSource(srcDataSource);
                    DataSet set = service.querys(srcTable + "(" + queryCols + ")", 0, step - 1
                            , orderKey + ":>" + max
                            , "ORDER BY " + orderKey);
                    max = set.getString(set.size() - 1, orderKey, "0");
                    if (set.size() == 0) {
                        break;
                    }
                    exeQty += set.size();
                    set =  format(set, cols);
                    Thread.sleep(10);
                    DataSourceHolder.setDataSource(tarDataSource);
                    service.insert(tarTable, set);
                    qty += set.size();
                    log.warn("[sync start][src:{}.{}][tar:{}.{}][qty:{}]", srcDataSource, srcTable, tarDataSource, tarTable, qty);

                    Thread.sleep(10);
                    DataSourceHolder.setDefaultDataSource();
                    task.put("LAST_EXE_VALUE", max);
                    service.update(task,"LAST_EXE_QTY","LAST_EXE_TIME");

                    //执行后SQL
                    //${table} ${max} ${order}
                    String[] afters = task.getStringNvl("AFTER_SQL_TAR").split(";");
                    for(String after:afters) {
                        if (BasicUtil.isNotEmpty(after)) {
                            DataSourceHolder.setDataSource(tarDataSource);
                            after = after.replace("${table}", task.getString("TAR_TABLE"));
                            after = after.replace("${max}", max);
                            after = after.replace("${order}", orderKey);
                            service.execute(after);
                        }
                    }
                    afters = task.getStringNvl("AFTER_SQL_SRC").split(";");
                    for(String after:afters) {
                        if (BasicUtil.isNotEmpty(after)) {
                            DataSourceHolder.setDataSource(srcDataSource);
                            after = after.replace("${table}", task.getString("SRC_TABLE"));
                            after = after.replace("${max}", max);
                            after = after.replace("${order}", orderKey);
                            service.execute(after);
                        }
                    }
                }catch (Exception e){
                    e.printStackTrace();
                    DataSourceHolder.setDefaultDataSource();
                    task.put("EXE_EXCEPTION", e.getMessage());
                    service.save(task,"LAST_EXE_QTY","LAST_EXE_TIME","EXE_EXCEPTION");
                }
            }
        }else if(type == 2){//更新
            max = "0"; //最后同步的主键值(清空从0开始)
            DataSourceHolder.setDataSource(tarDataSource);
            log.warn("[sync start][src:{}.{}][tar:{}.{}]", srcDataSource, srcTable, tarDataSource, tarTable);
            int tarQty = task.getInt("EXE_QTY",0);
            DataSourceHolder.setDataSource(srcDataSource);
            int srcQty = service.count(srcTable);
            //根据总数量 避免数据源清空时 同步数据
            if(srcQty < tarQty*0.9){
                log.warn("[sync start][数据源更新中][src:{}.{}][tar:{}.{}][src qty:{}]", srcDataSource, srcTable, tarDataSource, tarTable, srcQty);
                return -1;
            }
            while(true){
                try {
                    DataSourceHolder.setDataSource(srcDataSource);
                    DataSet set = service.querys(srcTable + "(" + queryCols + ")"
                            , 0, step - 1
                            , orderKey + ":>" + max, "ORDER BY " + orderKey);
                    max = set.getString(set.size() - 1, orderKey, "0");
                    if (set.size() == 0) {
                        break;
                    }
                    exeQty += set.size();
                    set = format(set,cols);
                    DataSourceHolder.setDataSource(tarDataSource);

                    for(String src_col:cols_map.keySet()){
                        String tar_col = cols_map.get(src_col);
                        set.changeKey(src_col, tar_col);
                    }
                    set.setPrimaryKey(task.getString("PK"));
                    service.update(tarTable, set);
                    qty += set.size();
                    log.warn("[sync start][src:{}.{}][tar:{}.{}][qty:{}]", srcDataSource, srcTable, tarDataSource, tarTable, qty);

                    DataSourceHolder.setDefaultDataSource();
                    task.put("LAST_EXE_VALUE", max);
                    service.update(task,"LAST_EXE_QTY","LAST_EXE_TIME");

                    //执行后SQL
                    //${table} ${max} ${order}
                    String after = task.getString("AFTER_SQL_TAR");
                    if(BasicUtil.isNotEmpty(after)){
                        DataSourceHolder.setDataSource(tarDataSource);
                        after = after.replace("${table}", task.getString("TAR_TABLE"));
                        after = after.replace("${max}", max);
                        after = after.replace("${order}", orderKey);
                        service.execute(after);
                    }
                    after = task.getString("AFTER_SQL_SRC");
                    if(BasicUtil.isNotEmpty(after)){
                        DataSourceHolder.setDataSource(srcDataSource);
                        after = after.replace("${table}", task.getString("SRC_TABLE"));
                        after = after.replace("${max}", max);
                        after = after.replace("${order}", orderKey);
                        service.execute(after);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                    DataSourceHolder.setDefaultDataSource();
                    task.put("EXE_EXCEPTION",  DateUtil.format()+":"+e.getMessage());
                    service.save(task,"LAST_EXE_QTY","LAST_EXE_TIME","EXE_EXCEPTION");
                }
            }
        }else {
            log.error("未设置同步类型:TYPE_CODE(0:全量,1:增量,2:更新)");
            return -1;
        }
        DataSourceHolder.setDefaultDataSource();

        String to = DateUtil.format();
        DataRow record = new DataRow();
        record.put("EXE_TIME_FR", fr);
        record.put("EXE_TIME_TO", to);
        record.put("SRC_DATASOURCE_CODE", srcDataSource);
        record.put("SRC_TABLE", srcTable);
        record.put("TAR_DATASOURCE_CODE", tarDataSource);
        record.put("TAR_TABLE", tarTable);
        record.put("EXE_QTY", exeQty);
        record.put("TYPE_CODE",type);  //0全量  1增量 2更新
        service.insert(" sync_record", record);


        task.put("LAST_EXE_QTY", qty);
        task.put("LAST_EXE_TIME", DateUtil.format("yyyy-MM-dd HH:mm:ss"));
        exe_times.put(srcDataSource+"."+srcTable, new Date());
        service.update(task,"LAST_EXE_QTY","LAST_EXE_TIME");
        return qty;
    }
    private DataSet format(DataSet set, String ... cols){
        set.replaceEmpty("NULL");
        set.replaces("\\","\\\\").replaces("'","''");
        List<String> inserts = new ArrayList<>();
        if (null != cols) {
            for (String col : cols) {
                String[] tmps = col.split(":");
                if (tmps.length > 1) {
                    if(!tmps[0].equalsIgnoreCase(tmps[1])) {
                        set.changeKey(tmps[0], tmps[1], true);
                    }
                    inserts.add(tmps[1]);
                }else{
                    inserts.add(col);
                }
            }
        }else{
            return set;
        }
        return set.extract(inserts);
    }
}
